from kafka import KafkaConsumer
import json


def simple_consumer():
    # 创建消费者
    consumer = KafkaConsumer(
        'monitor-data-topic',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest',  # 从最早的消息开始消费
        enable_auto_commit=True,  # 自动提交偏移量
        group_id='my-group',  # 消费者组ID
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),

        # SASL认证配置
        security_protocol='SASL_PLAINTEXT',  # 使用SASL_PLAINTEXT协议
        sasl_mechanism='PLAIN',              # 使用PLAIN认证机制
        sasl_plain_username='admin',  # 替换为你的用户名
        sasl_plain_password='admin123'   # 替换为你的密码
    )

    print("等待消息...")
    try:
        for message in consumer:
            print(f"主题: {message.topic}")
            print(f"分区: {message.partition}")
            print(f"偏移量: {message.offset}")
            print(f"消息内容: {message.value}")
            print("-" * 50)
    except KeyboardInterrupt:
        print("停止消费")
    finally:
        consumer.close()


if __name__ == "__main__":
    simple_consumer()